[AWS Glue]ジョブブックマークの動作を確認してみた
こんにちは、CX事業本部の若槻です。
AWS Glueでは、ジョブブックマークという機能を使用することにより、最後のジョブ実行以降の増分データのみを処理対象とすることが可能です。
今回は、ジョブブックマークが有効なGlueジョブで、前回のジョブ実行で処理済みのパーティションやS3オブジェクトに増分データが追加された場合の次回のGlueジョブでの動作を確認してみました。
確認すること
具体的は、AWSドキュメントに記載されている下記仕様通りの動作となるかを実際に確認してみます。
S3 に保存されている処理対象のファイルを識別するため、ジョブブックマークはファイル名ではなくオブジェクトの最終変更時刻を確認します。ジョブが最後に実行されてから入力オブジェクトが変更された場合、ジョブが再度実行されるときに入力オブジェクトが再処理されます。
AWS Glue データカタログ内の特定のパーティションの作成タイムスタンプが、ジョブブックマークによってキャプチャされた最後のジョブ実行のタイムスタンプよりも古い場合、パーティションがスキップされます。
確認してみた
環境作成
動作確認環境を作成します。
CloudFormationスタック
AWSTemplateFormatVersion: '2010-09-09' Resources: DevicesRawDataBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region} DevicesDataAnalyticsBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region} DevicesDataAnalyticsGlueDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: devices_data_analystics RawDataGlueTable: Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase TableInput: Name: devices_raw_data TableType: EXTERNAL_TABLE Parameters: has_encrypted_data: false serialization.encoding: utf-8 EXTERNAL: true StorageDescriptor: OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Columns: - Name: device_id Type: string - Name: timestamp Type: bigint - Name: state Type: boolean InputFormat: org.apache.hadoop.mapred.TextInputFormat Location: !Sub s3://${DevicesRawDataBucket}/raw-data SerdeInfo: Parameters: paths: "device_id, timestamp, state" SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe PartitionKeys: - Name: year Type: string - Name: month Type: string - Name: day Type: string IntegratedDataGlueTable: Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase TableInput: Name: devices_integrated_data TableType: EXTERNAL_TABLE Parameters: has_encrypted_data: false serialization.encoding: utf-8 EXTERNAL: true StorageDescriptor: OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Columns: - Name: device_id Type: string - Name: timestamp Type: bigint - Name: state Type: boolean - Name: year Type: string - Name: month Type: string - Name: day Type: string InputFormat: org.apache.hadoop.mapred.TextInputFormat Location: !Sub s3://${DevicesDataAnalyticsBucket}/integrated-data SerdeInfo: Parameters: paths: "device_id, timestamp, state, year, month, day" SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe ExecuteETLJobRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: devices-data-etl-glue-job-policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - glue:StartJobRun Resource: - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/devices-data-etl - Effect: Allow Action: - glue:GetPartition - glue:GetPartitions - glue:GetTable Resource: - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DevicesDataAnalyticsGlueDatabase} - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${RawDataGlueTable} - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${IntegratedDataGlueTable} - Effect: Allow Action: - glue:GetJobBookmark Resource: - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${RawDataGlueTable} - Effect: Allow Action: - s3:ListBucket - s3:GetBucketLocation Resource: - arn:aws:s3:::* - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-glue/jobs/* - Effect: Allow Action: - s3:GetObject Resource: - !Sub arn:aws:s3:::${DevicesRawDataBucket}/raw-data/* - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py - Effect: Allow Action: - s3:GetObject - s3:PutObject Resource: - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-temp-dir/* - Effect: Allow Action: - s3:PutObject Resource: - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/integrated-data/* DevicesDataETLGlueJob: Type: AWS::Glue::Job Properties: Name: devices-data-etl Command: Name: glueetl PythonVersion: 3 ScriptLocation: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py DefaultArguments: --job-language: python --job-bookmark-option: job-bookmark-enable --TempDir: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-temp-dir --GLUE_DATABASE_NAME: !Sub ${DevicesDataAnalyticsGlueDatabase} --SRC_GLUE_TABLE_NAME: !Sub ${RawDataGlueTable} --DEST_GLUE_TABLE_NAME: !Sub ${IntegratedDataGlueTable} GlueVersion: 2.0 ExecutionProperty: MaxConcurrentRuns: 1 MaxRetries: 0 Role: !Ref ExecuteETLJobRole
データソースとなるGlueデータカタログのリソース定義です。パーティションキーはyear
、month
、day
となります。
RawDataGlueTable: Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase TableInput: Name: devices_raw_data TableType: EXTERNAL_TABLE Parameters: has_encrypted_data: false serialization.encoding: utf-8 EXTERNAL: true StorageDescriptor: OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Columns: - Name: device_id Type: string - Name: timestamp Type: bigint - Name: state Type: boolean InputFormat: org.apache.hadoop.mapred.TextInputFormat Location: !Sub s3://${DevicesRawDataBucket}/raw-data SerdeInfo: Parameters: paths: "device_id, timestamp, state" SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe PartitionKeys: - Name: year Type: string - Name: month Type: string - Name: day Type: string
Glueジョブのリソース定義です。ジョブブックマークを有効にする場合はjob-bookmark-option
をjob-bookmark-enable
とする必要があります。
DevicesDataETLGlueJob: Type: AWS::Glue::Job Properties: Name: devices-data-etl Command: Name: glueetl PythonVersion: 3 ScriptLocation: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py DefaultArguments: --job-language: python --job-bookmark-option: job-bookmark-enable --TempDir: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-temp-dir --GLUE_DATABASE_NAME: !Sub ${DevicesDataAnalyticsGlueDatabase} --SRC_GLUE_TABLE_NAME: !Sub ${RawDataGlueTable} --DEST_GLUE_TABLE_NAME: !Sub ${IntegratedDataGlueTable} GlueVersion: 2.0 ExecutionProperty: MaxConcurrentRuns: 1 MaxRetries: 0 Role: !Ref ExecuteETLJobRole
Glueジョブスクリプト
GlueジョブのPySparkスクリプトは下記のようになります。今回は処理内容ではなく処理対象のデータが何であるかを確認したいので、データソースから取得したデータを変更を加えずデータターゲットに書き込むだけの処理内容となっています。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame args = getResolvedOptions( sys.argv, [ 'JOB_NAME', 'GLUE_DATABASE_NAME', 'SRC_GLUE_TABLE_NAME', 'DEST_GLUE_TABLE_NAME' ] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) df = glueContext.create_dynamic_frame.from_catalog( database = args['GLUE_DATABASE_NAME'], table_name = args['SRC_GLUE_TABLE_NAME'], transformation_ctx = 'datasource' ).toDF() df.show() dyf = DynamicFrame.fromDF(df, glueContext, 'integrated_data' ) glueContext.write_dynamic_frame.from_catalog( frame = dyf, database = args['GLUE_DATABASE_NAME'], table_name = args['DEST_GLUE_TABLE_NAME'], transformation_ctx = 'datasink' ) job.commit()
データソースからのデータ取得時にジョブブックマークを使用する場合は、create_dynamic_frame.from_catalog()
でtransformation_ctx
オプションを指定します。
For job bookmarks to work properly, enable the job bookmark parameter and set the transformation_ctx parameter. If you don't pass in the transformation_ctx parameter, then job bookmarks are not enabled for a dynamic frame or a table used in the method.
df = glueContext.create_dynamic_frame.from_catalog( database = args['GLUE_DATABASE_NAME'], table_name = args['SRC_GLUE_TABLE_NAME'], transformation_ctx = 'datasource' ).toDF()
デプロイ
以降のコマンド実行で使用する変数を定義します。
% AWS_REGION=ap-northeast-1 % ACCOUNT_ID=$(aws sts get-caller-identity | jq -r ".Account") % RAW_DATA_BUCKET=s3://devices-raw-data-${ACCOUNT_ID}-${AWS_REGION} % DATA_ANALYTICS_BUCKET=s3://devices-data-analytics-${ACCOUNT_ID}-${AWS_REGION}
CloudFormationスタックをデプロイします。
% aws cloudformation deploy \ --template-file template.yaml \ --stack-name devices-data-analytics-stack \ --capabilities CAPABILITY_NAMED_IAM \ --no-fail-on-empty-changeset
GlueジョブのスクリプトをS3バケットにアップロードします。
% aws s3 cp devices-data-etl.py \ ${DATA_ANALYTICS_BUCKET}/glue-job-script/devices-data-etl.py
動作確認
確認1:新規パーティション・新規オブジェクト
Glueジョブで未処理の新規パーティションに新規オブジェクトとしてデータが作成された場合のジョブ実行時の動作を確認してみます。(後続の確認2,3と比較のため)
データが記載されたオブジェクトraw-data-A.json
をS3バケットのパーティションパスに作成します。
{"device_id": "3ff9c44a", "timestamp": 1609348014, "state": true}
% aws s3 cp raw-data-1.json \ ${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=06/raw-data-A.json
データカタログのパーティションを更新します。
% aws athena start-query-execution \ --query-string "MSCK REPAIR TABLE ${GLUE_DATABASE_NAME}.${RAW_DATA_GLUE_TABLE_NAME}" \ --work-group primary
ジョブを実行します。
% aws glue start-job-run --job-name devices-data-etl
スクリプト内でのdf.show()
の出力を見ると、下記のように追加したデータが過不足なく取得できていることが分かります。
>>> df.show() +---------+----------+-----+----+-----+---+ |device_id| timestamp|state|year|month|day| +---------+----------+-----+----+-----+---+ | 3ff9c44a|1609348014| true|2021| 01| 06| +---------+----------+-----+----+-----+---+
データターゲットのデータカタログからデータを取得してみます。
SELECT * FROM "devices_data_analystics"."devices_integrated_data"
データソースから取得されたデータがデータターゲットにロードされています。
確認2:処理済みパーティション・新規オブジェクト
前回のGlueジョブで処理済みのパーティションに新規オブジェクトとしてデータが作成された場合のジョブ実行時の動作を確認してみます。
データが記載されたオブジェクトraw-data-B.json
をS3バケットの前回と同じパーティションパスに作成します。
{"device_id": "e36b7dfa", "timestamp": 1609375822, "state": true}
% aws s3 cp raw-data-2.json \ ${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=06/raw-data-B.json
ジョブを実行します。
% aws glue start-job-run --job-name devices-data-etl
スクリプト内でのdf.show()
の出力を見ると、下記のように追加したデータが過不足なく取得できていることが分かります。
>>> df.show() +---------+----------+-----+----+-----+---+ |device_id| timestamp|state|year|month|day| +---------+----------+-----+----+-----+---+ | e36b7dfa|1609375822| true|2021| 01| 06| +---------+----------+-----+----+-----+---+
データターゲットのデータカタログからデータを取得してみます。
SELECT * FROM "devices_data_analystics"."devices_integrated_data"
データソースから取得された追加のデータ(device_id=e36b7dfa
)がデータターゲットにロードされています。
確認3:処理済みパーティション・処理済みオブジェクト
前回のGlueジョブで処理済みのオブジェクトに追記によりデータが追加された場合のジョブ実行時の動作を確認してみます。
前回処理したものと同じ名前で、2行目にデータが追記されたオブジェクトraw-data-B.json
を、S3バケットの前回と同じパーティションパスに作成します。
{"device_id": "e36b7dfa", "timestamp": 1609375822, "state": true} {"device_id": "7d4215d0", "timestamp": 1609497057, "state": false}
% aws s3 cp raw-data-3.json \ ${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=06/raw-data-B.json
ジョブを実行します。
% aws glue start-job-run --job-name devices-data-etl
スクリプト内でのdf.show()
の出力を見ると、下記のように今回追記したデータに加えて、前回処理したデータが重複して取得されていることが分かります。
>>> df.show() +---------+----------+-----+----+-----+---+ |device_id| timestamp|state|year|month|day| +---------+----------+-----+----+-----+---+ | e36b7dfa|1609375822| true|2021| 01| 06| | 7d4215d0|1609497057|false|2021| 01| 06| +---------+----------+-----+----+-----+---+
データターゲットのデータカタログからデータを取得してみます。
SELECT * FROM "devices_data_analystics"."devices_integrated_data"
今回はデータターゲットには追加のデータがロードされていませんでした。
この動作は、ジョブブックマークはデータソースからのデータ取得時だけでなく、データターゲットへのデータロード時にも適用させることができるためです。
データターゲットへのデータロード時にジョブブックマークを使用する場合も、write_dynamic_frame.from_catalog()
でtransformation_ctx
オプションを指定します。
glueContext.write_dynamic_frame.from_catalog( frame = dyf, database = args['GLUE_DATABASE_NAME'], table_name = args['DEST_GLUE_TABLE_NAME'], transformation_ctx = 'datasink' )
まとめ
検証結果をまとめると下記の通りとなりました。(データソースからのデータ取得、データターゲットへのデータロードのいずれでもジョブブックマークを有効にした場合)冒頭で示したAWSドキュメント通りの仕様となりました。
- 前回までのジョブで処理済みのパーティション内に作成されたオブジェクトのデータは、データソースからの取得、データターゲットへのロードの対象となる。
- 前回までのジョブで処理済みのオブジェクトにデータが追記された場合は、そのオブジェクトのデータはデータソースからの取得の対象となる(ジョブ内でのデータの重複が発生する)が、データターゲットへのロードの対象とはならない(データの欠損が発生する)。
ちなみに、Amazon Kinesis Data FirehoseによりS3バケットへ配信されたデータをGlueジョブのデータソースとする場合は、レコードが重複なく保持されているバッファごとにランダムな名前のオブジェクトが作成されるので、2.の仕様を懸念する必要はなさそうです。
The frequency of data delivery to Amazon S3 is determined by the Amazon S3 Buffer size and Buffer interval value that you configured for your delivery stream. Kinesis Data Firehose buffers incoming data before it delivers it to Amazon S3.
The Amazon S3 object name follows the pattern DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-dd-HH-MM-SS-RandomString,
おわりに
ジョブブックマークが有効なGlueジョブで、前回のジョブ実行で処理済みのパーティションやS3オブジェクトに増分データが追加された際の動作を確認してみました。
このようにGlueのジョブブックマークはデータの処理履歴を裏側でよろしく管理してくれてとても便利なのでGlueジョブを使う際は是非とも有効活用したいですね。
参考
以上